package com.fulu.game.bigdata.realtime.sink

import java.lang

import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.LongSerializer

class KafkaStringSerializationSchema(topic:String) extends KafkaSerializationSchema[String]{

  override def serialize(element: String, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {

    val key: Long = if(timestamp != null)  timestamp else System.currentTimeMillis()

    new ProducerRecord(topic,new LongSerializer().serialize(null,key), element.getBytes("utf-8"))
  }
}
